{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<!--\n",
    "    Licensed to the Apache Software Foundation (ASF) under one\n",
    "    or more contributor license agreements.  See the NOTICE file\n",
    "    distributed with this work for additional information\n",
    "    regarding copyright ownership.  The ASF licenses this file\n",
    "    to you under the Apache License, Version 2.0 (the\n",
    "    \"License\"); you may not use this file except in compliance\n",
    "    with the License.  You may obtain a copy of the License at\n",
    "\n",
    "      http://www.apache.org/licenses/LICENSE-2.0\n",
    "\n",
    "    Unless required by applicable law or agreed to in writing,\n",
    "    software distributed under the License is distributed on an\n",
    "    \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
    "    KIND, either express or implied.  See the License for the\n",
    "    specific language governing permissions and limitations\n",
    "    under the License.\n",
    "-->\n",
    "\n",
    "# Interactive Beam Running on Flink"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import apache_beam as beam\n",
    "from apache_beam.runners.interactive import interactive_runner\n",
    "from apache_beam.runners.portability import flink_runner\n",
    "\n",
    "p = beam.Pipeline(interactive_runner.InteractiveRunner(underlying_runner=flink_runner.FlinkRunner()))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "init_pcoll = p | beam.Create(range(10))\n",
    "squares = init_pcoll | 'Square' >> beam.Map(lambda x: x*x)\n",
    "cubes = init_pcoll | 'Cube' >> beam.Map(lambda x: x**3)\n",
    "result = p.run()\n",
    "result.wait_until_finish()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "result.get(squares)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class AverageFn(beam.CombineFn):\n",
    "  def create_accumulator(self):\n",
    "    return (0.0, 0)\n",
    "\n",
    "  def add_input(self, sum_count, input):\n",
    "    (sum, count) = sum_count\n",
    "    return sum + input, count + 1\n",
    "\n",
    "  def merge_accumulators(self, accumulators):\n",
    "    sums, counts = zip(*accumulators)\n",
    "    return sum(sums), sum(counts)\n",
    "\n",
    "  def extract_output(self, sum_count):\n",
    "    (sum, count) = sum_count\n",
    "    return sum / count if count else float('NaN')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "average_square = squares | 'Average Square' >> beam.CombineGlobally(AverageFn())\n",
    "average_cube = cubes | 'Average Cube' >> beam.CombineGlobally(AverageFn())\n",
    "result = p.run()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "result.get(average_square)"
   ]
  }
 ],
 "metadata": {
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.5rc1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
